{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "*This notebook is under construction; please see the [pipeline](../getting_started/2_Pipeline.ipynb) and [nyc_taxi](https://examples.pyviz.org/nyc_taxi/nyc_taxi.html) notebooks for extensive examples of working with points.  For now, this section only includes information about spatially indexed datasets.*"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Spatial indexing\n",
    "\n",
    "In most cases, Datashader must iterate through your entire dataset to render any plot, because it cannot assume the datapoints have been sorted in any particular order. Thus, the aggregation performance is dependent on the number of datapoints in your entire dataframe, not just those in the current viewport (x and y range). If you have a large dataset covering a wide area and you want to support fast local operations (e.g. if you have data at a global level but analysis is typically done in small local regions), Datashader supports optionally storing your data in a spatially indexed format.  This format makes it very fast to create a new dataframe with only the points from a restricted region, without even needing to bring the other data points into main memory.  For a detailed description of the spatial indexing approach used by Datashader, and performance results on a real-world dataset, please see the [*Spatial Partitioning of Dask DataFrames using Hilbert Curves*](https://anaconda.org/jonmmease/spatial-partitioning-for-datashader-points-rendering/notebook) notebook.\n",
    "\n",
    "For instance, if you start with an unordered dataframe `df` of 2 million points:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd, numpy as np, dask.dataframe as dd, datashader as ds\n",
    "import datashader.transfer_functions as tf\n",
    "from collections import OrderedDict as odict\n",
    "\n",
    "num=2000000\n",
    "np.random.seed(1)\n",
    "\n",
    "dists = {cat: pd.DataFrame(odict([('x',np.random.normal(x,s,num)), \n",
    "                                  ('y',np.random.normal(y,s,num)), \n",
    "                                  ('val',val), \n",
    "                                  ('cat',cat)]))      \n",
    "         for x,  y,  s,  val, cat in \n",
    "         [(  2,  2, 0.03, 10, \"d1\"), \n",
    "          (  2, -2, 0.10, 20, \"d2\"), \n",
    "          ( -2, -2, 0.50, 30, \"d3\"), \n",
    "          ( -2,  2, 1.00, 40, \"d4\"), \n",
    "          (  0,  0, 3.00, 50, \"d5\")] }\n",
    "\n",
    "df = pd.concat(dists,ignore_index=True)\n",
    "df[\"cat\"]=df[\"cat\"].astype(\"category\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can sort it spatially on `x` and `y` using:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import datashader.spatial.points as dsp\n",
    "%time dsp.to_parquet(df, 'sorted.parq', 'x', 'y', shuffle='disk', npartitions=32)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This process involves sorting and then partitioning the entire dataset and then writing the resulting partitions to a Parquet file (which requires the `fastparquet` library).  This is a relatively expensive operation and will take some time, e.g. 5-10 minutes for a 100-million-point dataframe on a 4-core laptop with 16GB of RAM.  After this process completes you can load the parquet file as a `SpatialPointsFrame`, and use it to quickly access subsets of the dataset for a region of interest."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "sframe = dsp.read_parquet('sorted.parq')\n",
    "sframe"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "As you can see, the full dataset is split across 32 partitions (this number can be customized using the `npartitions` argument to the `dsp.to_parquet` function); the rest of the information is not known because the dataset has not actually been read in yet. \n",
    "\n",
    "Because `SpatialPointsFrame` is a subclass of `dask.dataframe.DataFrame`, you can use `sframe` anywhere that a Dask `DataFrame` is accepted.  What makes the `SpatialPointsFrame` class particularly useful, however, is an additional `spatial_query` method that you can use to request only the subset of partitions that may contain points that overlap with a rectangular query region.\n",
    "\n",
    "You can use the `x_range` and `y_range` properties of the `sframe.spatial` accessor to check the extents of the entire dataset.  These are metadata that were calculated when `'sorted.parq'` was created, so accessing this information does not require loading any of the dataset from disk."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print('x_range: ', sframe.spatial.x_range)\n",
    "print('y_range: ', sframe.spatial.y_range)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Here are the range extents of 4 progressively smaller viewports. Notice that the first viewport encompasses the entire dataset. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "x_ranges=[(-20, 20), (1, 11), (1.5, 2.5), (2.04, 2.1)]\n",
    "y_ranges=[(-20, 20), (-5, 5), (1.5, 2.5), (2.04, 2.1)]\n",
    "ranges = list(zip(x_ranges, y_ranges))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now, let's define a `create_image` helper function that inputs the range extents of a viewport, and whether or not to utilize spatial indexing, and performs aggregation and shading.  The title of the resulting image includes the number of partitions that were processed and the average aggregation time."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Read as standard (non-spatial) Dask dataframe\n",
    "frame = dd.read_parquet('sorted.parq')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def create_image(x_range, y_range, use_spatial=True):\n",
    "    \"\"\"Given a range, time multiple aggregations and average them\"\"\"\n",
    "    \n",
    "    df = frame if not use_spatial else sframe.spatial_query(x_range, y_range)\n",
    "    \n",
    "    canvas = ds.Canvas(x_range=x_range, y_range=y_range)\n",
    "    agg = canvas.points(df,'x','y', agg=ds.count_cat('cat'))\n",
    "    timing = %timeit -n 1 -r 2 -o canvas.points(df,'x','y', agg=ds.count_cat('cat'))\n",
    "    title = \"{} partitions, {:.3f} sec\".format(df.npartitions, timing.average)\n",
    "\n",
    "    return tf.shade(agg, name=title)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Without spatial indexing, the aggregation time for each of these viewports is nearly constant because the data from all 32 partitions must be processed regardless of how small the viewport is."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "tf.Images(*[create_image(x_range, y_range, use_spatial=False) for x_range, y_range in ranges])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "With spatial indexing, however, smaller viewports require processing fewer partitions, resulting in significant runtime reductions."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "tf.Images(*[create_image(x_range, y_range, use_spatial=True) for x_range, y_range in ranges])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Spatial indexing in Holoviews\n",
    "The `create_image` function manually calls `spatial_query` and then passes the resulting Dask `DataFrame` to `Canvas.points`.  If the number of partitions is not needed, then the `SpatialPointsFrame` object can be passed to `Canvas.points` directly, in which case datashader will call `spatial_query` internally.\n",
    "\n",
    "Because of this, it is possible to take advantage of spatial indexing when datashading Holoviews `Points` elements by simply constructing a `Points` elements from a `SpatialPointsFrame` object."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import holoviews as hv\n",
    "from holoviews.operation.datashader import datashade\n",
    "hv.extension('bokeh')\n",
    "\n",
    "datashade(hv.Points(sframe, kdims=['x', 'y'], vdims=['cat']), aggregator=ds.count_cat('cat')).opts(width=400, height=400)"
   ]
  }
 ],
 "metadata": {
  "language_info": {
   "name": "python",
   "pygments_lexer": "ipython3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
